
#include <raft>
#include <raftio>

#include "kernels/microflow_reader.h"
#include "kernels/clfr_reader.h"
#include "kernels/clfr_counter.h"

// Apps.
// Profiler.
#include "kernels/host_timing_profiler.h"

// Feature calculators.
#include "kernels/rich_feature_calculator.h"
#include "kernels/netflow_feature_calculator.h"
#include "kernels/pfe_feature_calculator.h"
#include "kernels/allstats_feature_calculator.h"

// Micro-burst detector.
#include "kernels/microburst_detector.h"


// Utilities.
#include "kernels/cloner.h"

#include "kernels/clfr_counter_chain.h"

#include "kernels/clfr_counter_replicated.h"

#include "kernels/benchmark_printer.h"


#include <cstdint>
#include <iostream>
#include <fstream>
#include <string>



void runSimpleCounter();
void runHostProfiler();
void runClassifiers();
void runMicroburstDetector();

void runClones(int N);
void runChains(int N);
void runReplicas(int N);

void benchmarkReplicas();

int main(int argc, char** argv)
{
  benchmarkReplicas();
  return 0;
  runSimpleCounter();
  return 0;
  runClones(1);
  return 0;
  benchmarkReplicas();

  // for (int i = 1; i<=64; i = i*2){
  //   std::cout << "testing " << i << " chain" << endl;
  //   std::cout << " ------------ " << endl;
  //   runChains(i);
  //   std::cout << " ------------ " << endl;
  // }

  // for (int i = 1; i!=32; i = i*2){
  //   std::cout << "testing " << i << " clones" << endl;
  //   std::cout << " ------------ " << endl;
  //   runClones(i);
  //   std::cout << " ------------ " << endl;
  // }
  return 0;
}


void runSimpleCounter(){
  raft::map m;
  std::cout << "initializing kernels." << endl;
  // The kernel to read CLFRs. Emits CLFRs generated by converter.
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/caida2015_02_dirA.mCLFRs.bin.clfrs");
  // starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/mCLFRs.32.bin.clfrs");  
  starflow::kernels::ClfrCounter<starflow::kernels::MicroflowReader::output_t> counter;
  m += reader >> counter;
  m.exe();
  return;
}


void runHostProfiler(){
  raft::map m;
  std::cout << "initializing kernels." << endl;
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/caida2015_02_dirA.mCLFRs.bin.clfrs");
  starflow::kernels::HostProfiler<starflow::kernels::MicroflowReader::output_t> profiler("/home/jsonch/gits/starflow_analytics/outputs/timingProfile.bin", 5 * 60 * 1000);
  m += reader >> profiler;  
  m.exe();
  return;
}
void runClassifiers(){
  raft::map m;
  std::cout << "initializing kernels." << endl;
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/caida2015_02_dirA.mCLFRs.bin.clfrs");
  // Compute clfr features. 
  starflow::kernels::FeatureCalculator<starflow::kernels::ClfrReader::output_t> calculator;

  // Compute netflow features.
  // starflow::kernels::NetFlowFeatureCalculator<starflow::kernels::ClfrReader::output_t> calculator;

  // Compute PFE aggregatable features.
  // starflow::kernels::PFEFeatureCalculator<starflow::kernels::ClfrReader::output_t> calculator;

  // Compute complex aggregated stat features.
  // starflow::kernels::AllStatsFeatureCalculator<starflow::kernels::ClfrReader::output_t> calculator;
  // m += reader >> calculator;  
  m += reader >> calculator;  
  m.exe();

}
void runMicroburstDetector(){
  raft::map m;
  std::cout << "initializing kernels." << endl;
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/microburst.clfrs.bin");

  starflow::kernels::MicroburstDetector<starflow::kernels::ClfrReader::output_t> detector;
  m += reader >> detector;  
  m.exe();
}

void runClones(int N){
  raft::map m;
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/mCLFRs.32.bin.clfrs");  

  // Clone CLFRs to multiple counters.
  starflow::kernels::ClfrCloner<starflow::kernels::MicroflowReader::output_t> cloner(N);
  m += reader >> cloner;
  starflow::kernels::ClfrCounter<starflow::kernels::MicroflowReader::output_t> counters[N];
  for (int i = 0; i<N; i++){
    m += cloner[std::to_string(i)] >> counters[i];
  }
  std::cout << "executing kernels." << endl;
  m.exe();
}


void runChains(int N){
  starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/caida2015_02_dirA.mCLFRs.bin.clfrs");
  // starflow::kernels::ClfrReader reader("/home/jsonch/gits/starflow_analytics/inputs/mCLFRs.32.bin.clfrs");  
  std::cout << "N: " << N << std::endl;
  if (N > 1){
    raft::map m;
    starflow::kernels::ClfrCounterChain<starflow::kernels::MicroflowReader::output_t> counters[N-1];
    m += reader >> counters[0];
    for (int i = 0; i<(N-2); i++){
      m += counters[i] >> counters[i+1];
    }
    starflow::kernels::ClfrCounter<starflow::kernels::MicroflowReader::output_t> counter;
    m += counters[N-2] >> counter;
    std::cout << "executing kernels." << endl;
    m.exe();
  }
  else{
    raft::map m;
    starflow::kernels::ClfrCounter<starflow::kernels::MicroflowReader::output_t> counter;
    m += reader >> counter;    
    std::cout << "executing kernels." << endl;
    m.exe();
  }
}

void runReplicas(int N){
  raft::map m;  

  starflow::kernels::ClfrReader * readers[N];
  // starflow::kernels::ClfrCounterReplicated<starflow::kernels::MicroflowReader::output_t> * counters[N];
  // starflow::kernels::NetFlowFeatureCalculator<starflow::kernels::MicroflowReader::output_t> * sinks[N];
  starflow::kernels::MicroburstDetector<starflow::kernels::MicroflowReader::output_t> * sinks[N];


  starflow::kernels::BenchmarkPrinter<double> logger(N);

  for (int i=0; i<N; i++){
    readers[i] = new starflow::kernels::ClfrReader("/home/jsonch/gits/starflow_analytics/inputs/microburst.clfrs.bin");
    // readers[i] = new starflow::kernels::ClfrReader("/home/jsonch/gits/starflow_analytics/inputs/mCLFRs.32.bin.clfrs");
    // counters[i] = new starflow::kernels::ClfrCounterReplicated<starflow::kernels::MicroflowReader::output_t>(i);

    sinks[i] = new starflow::kernels::MicroburstDetector<starflow::kernels::MicroflowReader::output_t>;
    m += *(readers[i]) >> *(sinks[i]);
    m += *(sinks[i]) >> logger[std::to_string(i)];
  }

  m.exe();  

}


void benchmarkReplicas(){
  for (int i = 1; i<=16; i = i*2){
    std::cout << "testing " << i << " replicas" << endl;
    std::cout << " ------------ " << endl;
    runReplicas(i);
    std::cout << " ------------ " << endl;
  }

}
